Handmade Machine Learning Algorithms

Below is a collection on Machine Learning algorithms coded from scratch. This project is an exploration of how common machine learning models work under the hood. By implementing these algorithms from scratch in Python, I aim to deepen my understanding of the mathematics power these techniques, while also visualizing their pros, cons, and differences!

Intro Code

Imports

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statistics import mean

from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import plotly.io as pio

# Use pio to ensure plotly plots render in Quarto
pio.renderers.default = 'notebook_connected'

Function to Plot Training Data and Regression Plane

Code
import plotly.graph_objects as go

def plot_3d_regression(model, model_name, data_type = 'linear'):

    # Make Standard Data
    X, y = make_regression(
    n_samples=300,     # Number of samples
    n_features=2,      # Number of features
    noise=15,          # Add noise to make it realistic
    random_state=42    # Set seed for reproducibility
)

    # Make standard non-linear data
    if data_type.lower() == 'non-linear':
        y = y + 500 * np.sin(X[:, 0]) + 250 * np.cos(X[:, 1])

    # Make non-standard data
    if data_type == 'ugly':
        # Add non-linear transformations
        y = y + 500 * np.sin(X[:, 0]) * np.cos(X[:, 1]) \
              + 300 * (X[:, 0] ** 2) \
              - 200 * np.exp(-X[:, 1] ** 2)

        # Add random feature interactions
        y = y + 100 * (X[:, 0] * X[:, 1])

        # Add random noise to make it "uglier"
        y = y + np.random.normal(0, 150, size=y.shape)

    model.fit(X, y)

    # Create a mesh grid for the features
    x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100),
                                np.linspace(min(X[:, 1]), max(X[:, 1]), 100))
    grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T 

    predictions = model.predict(grid)

    # Create 3D scatter plot for training data
    scatter = go.Scatter3d(
        x=X[:, 0], y=X[:, 1], z=y,
        mode='markers', marker=dict(color='blue', size=5), name='Training Data'
    )

    # Create 3D surface plot for the regression surface
    surface = go.Surface(
        x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface'
    )

    # Combine both traces into one figure
    fig = go.Figure(data=[scatter, surface])

    # Update layout for better visualization
    fig.update_layout(
        title=f'Training Data and Regression Surface for {model_name}',
        scene=dict(
            xaxis_title='Feature 1',
            yaxis_title='Feature 2',
            zaxis_title='Target'
        )
    )

    # Show plot
    fig.show()

Regression Algorithms

OLS Regression

class ols_regression():

    # Initialize the class
    def __init__(self):
        pass       
    
    def fit(self, X, y):
        '''Fit the regression to the X data via the OLS equation'''

        # Add a leading colums of 1s to the X data to account for the bias term
        X = np.hstack((np.ones((X.shape[0], 1)), X))

        # Train the data on (X.T @ X)^(-1) @ X.T @ y
        ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))
        self.coef = ols[1:]
        self.bias = ols[0]

    def predict(self, X):
        '''Predict new data with the trained coefficients and bias'''

        # Check if the X data is 1D and reshape if needed
        if X.ndim == 1:
                    X = X.reshape(-1, 1) 

        # Make predictions by dotting the new data with the coefficients and adding the bias
        self.predictions = X.dot(self.coef) + self.bias
        
        return self.predictions

For each algorithm, we build two practice datasets on which we fit the algorithm. These are a linear dataset and an ‘ugly’ dataset that is non-linear and contains significant noise. We then plot the training data and a projection grid for the trained algorithm to understand how it makes predictions and showcase it’s ability to ‘understand’ patterns in the underlying data.

ols = ols_regression()

plot_3d_regression(ols, model_name='OLS', data_type='linear')
plot_3d_regression(ols, model_name='OLS', data_type='ugly')

While we can see that the OLS regression fits the linear dataset quite well, there is significant error when fitting to the non-linear data. As we can see in the prediction grid, this is because the regression only operates in a linear fashion, which does match the structure of the training data.

Gradient Descent Regression

Next, we build a linear regression via gradient descent. Unlike OLS which is a closed form solution, gradient descent operates by calculating the gradient of the loss function (MSE) for any given set of regression weights, and systematically changing the weights in order to ‘move down’ the gradient and minimize the loss of predictions.

While the final result should be the same as in OLS, they do not always align due to some randomness in attempting to find the absolute minimum of the loss function (although predictions are generally close enough). That said, Gradient Descent is a good option for larger datasets due to its lower computational requirements.

class GDRegression():
    '''Simple class for performing linear regression via gradient descent. Note: this is a simple execution and does not include options for 
       regularization'''
    def __init__(self, epochs, eta):
        '''Initialize the Gradient Descent Regression Class'''
        self.epochs = epochs
        self.eta = eta

    def fit(self, X, y, batch_size = "TBD"):
        '''Train the Gradient Descent Regression Class'''

        if batch_size == 'TBD':
            batch_size = X.shape[0]


        # Create random initialization for the bias and coefficients
        bias = np.random.random()
        coef = np.random.random(X.shape[1])

        # Iterate through each epoch
        for iter in range(self.epochs):
            
            indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False)
            X_batch = X[indices]
            y_batch = y[indices]

            # Make predictions for the X data being trained on
            y_hat = X_batch.dot(coef) + bias

            # Calculate the derrivative WRT bias and coef given the predicions
            derr_b = 2/X_batch.shape[0] * sum((y_hat - y_batch))
            derr_c = 2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)

            # Update the bias and the coef based on the derrivative
            bias = bias - derr_b * self.eta
            coef = coef - derr_c * self.eta

        # Finalize the bias and coef
        self.bias = bias
        self.coef = coef

    def predict(self, X):
        '''Predict new data given the learned bias and coef'''
        predictions = X.dot(self.coef) + self.bias
        return predictions

        
gd_reg = GDRegression(epochs=10000, eta=.01)
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='linear')
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='ugly')

As expected, the results for the gradient descent regression are effectively the same as for the OLS! The prediction grid is fit very well to the linear dataset, but fails to understanad the non-linear nature of the ‘ugly’ data.

KNN Regression

We next build a K-Nearest Neighbors regressor. K-Nearest Neighbors works by taking the K closest (physically) points to any specific point in question, and predicting the average of those K points as the output. This works with the assumption that point close together are similar in nature.

class KNNRegressor():
    '''A simple class for performing Nearest-Neighbors Regression. Note: this is a simple execution, and does not include options for
       changes in distance calculation metrics beyond Euclidian distance.'''

    def __init__(self, n_neighbors=5):
        '''Initialize the regressor with a defined number of nearest neighbors'''
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        '''Train the regressor by loading in all X and y data'''
        self.X = X
        self.y = y

    def predict(self, X):
        '''Make predictions based on the training data using euclidian distance'''
        predictions = np.empty(0)

        # For each test point...
        for test_point in X:
            # Calculate the distance between the test point and all training points
            distances = np.linalg.norm(self.X - test_point, axis=1)

            # Find the n_neighbors closest points
            closest_points_indices = np.argsort(distances)[:self.n_neighbors]

            # Use the mean of the closest points to formulate a predictions and append to the predictions array
            prediction = mean(self.y[closest_points_indices])
            predictions = np.append(predictions, prediction)

        return predictions
knn_regressor = KNNRegressor()

plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='linear')
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='ugly')

We first see that unilke in the linear regression, our prediction grids are now bumpy! This is because each prediction is dependant on the K nearest points, which introduces randomness into the prediction grid. The next big change vs. the previous linear regression is that the KNN regression is able to fit the non-linear data much better. However, we do still see some issues in fitting some of the very noisy data points well, as they fail to fit the underlying assumption that points close together are similar.

Decision Tree Regressor

We next build a decision tree regressor. Decision Tree Regression splits the data into subsets based on feature values to create a tree-like model of decisions. At each split, the algorithm chooses the feature and threshold that minimize the error in predicting the target variable.

The model predicts the target value of a new data point by following the tree’s splits until reaching the bottom of the tree , where the output is the average of the target values in that leaf.

class DecisionTreeRegressor():
    '''A class to build a decision tree regressor. Note this is a simple executuion and does not include common parameters for regularization
       other than max_depth'''
    
    def __init__(self, max_depth=None):
        # Initializes the decision tree regressor with an optional maximum depth for the tree.
        self.max_depth = max_depth

    # Function for calculating the Mean Squared Error (MSE) of a split
    def mse(self, y):
        # MSE is calculated as the average of squared differences from the mean
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        # Initialize variables to track the best split found
        self.best_mse = float('inf')  # Best MSE starts as infinity
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        # Loop over each feature in the dataset
        for feature_num in range(X.shape[1]):
            # Get unique values in the feature column to test potential splits
            feature_values = np.unique(X[:, feature_num])
            
            # For each unique value, try splitting the data
            for value in feature_values:
                # Find indices where the feature values are less than or equal to the split value
                left_index = X[:, feature_num] <= value
                right_index = X[:, feature_num] > value

                # Split the target values accordingly
                left_targets = y[left_index]
                right_targets = y[right_index]

                # Proceed only if both splits result in non-empty groups
                if len(left_targets) > 0 and len(right_targets) > 0:
                    # Compute MSE for both the left and right splits
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    
                    # Calculate the weighted average MSE of the two splits
                    total_average_mse = left_mse * len(left_targets)/len(y) + right_mse * len(right_targets)/len(y)

                    # If this split provides a better (lower) MSE, update the best split found
                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        # Return the best split information (feature index, value, and target splits)
        return self.best_split_value, self.best_feature, self.left_y, self.right_y
    
    # Function to recursively build the decision tree
    def _build_tree(self, X, y, depth=0):
        # Base case: If all targets are the same or max depth is reached, return the mean of the target values
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        # Find the best split for the data
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        # If no valid split was found, return the mean of the targets
        if best_feature is None:
            return np.mean(y)
        
        # Split the data based on the best feature and split value
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        # Recursively build the left and right subtrees
        left_tree = self._build_tree(X[left_index], left_y, depth + 1)
        right_tree = self._build_tree(X[right_index], right_y, depth + 1)

        # Return the current node as a dictionary with the best split and its left and right subtrees
        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }
    
    # Function to make a prediction for a single sample using the tree
    def _single_prediction(self, tree, x):
        # If the current tree node is a dictionary (not a leaf), recursively traverse the tree
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)  # Go left
            else:
                return self._single_prediction(tree['right'], x)  # Go right
        # If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
        else:
            return tree
        
    # Function to predict target values for a set of samples
    def predict(self, X):
        # For each sample in X, make a prediction by recursively traversing the tree
        predictions = np.array([self._single_prediction(self.tree, x) for x in X])
        return predictions

    # Function to fit the decision tree to the training data
    def fit(self, X, y):
        # Build the tree by calling the recursive function with the training data
        self.tree = self._build_tree(X, y)
dt_reg = DecisionTreeRegressor()
plot_3d_regression(dt_reg, "Decision Tree", data_type='linear')
plot_3d_regression(dt_reg, "Decision Tree", data_type='ugly')

With the Decision Tree regressor, similar to the KNN regressor, we are able to fit well to both the linear and non-linear datasets. The regression surface does however look a bit different, as there are sharper horozontal cuts rather than an overall bumpy surface. This is caused by the ‘cutting’ of the dataset on feature values during training, which segments the data sharply.

Random Forest Regression

We next build a Random Forest Regressor. Random Forest Regression improves upon Decision Trees by combining the predictions of multiple trees to make a more accurate and stable model. Each tree is trained on a random subset of the data and features, and the final prediction is the average of all the trees’ outputs.

class RandomForestRegressor():
    def __init__(self, n_estimators, max_depth=None):
        '''A class to build a Random Forest Regressor. Note this is a simple execution and does not include parameters for regularization
           other than n_estimators and max_depth'''
        
        # Initializes the random forest regressor with the number of estimators (trees) and an optional maximum depth for each tree.
        self.n_estimators = n_estimators
        self.max_depth = max_depth

    # Function for calculating the Mean Squared Error (MSE) of a split
    def mse(self, y):
        # MSE is calculated as the average of squared differences from the mean
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        # Initialize variables to track the best split found
        self.best_mse = float('inf')  # Best MSE starts as infinity
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        # Randomly sample 1/3 of the total features to consider for splitting
        n_features_to_consider = max(1, X.shape[1] // 3)
        random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)

        # Only consider the randomly selected features for splitting
        feature_subset = X[:, random_feature_indices]

        # Loop through the randomly selected features and find the best split
        for i, feature_num in enumerate(random_feature_indices):  # Map back to original feature indices
            feature_values = np.unique(feature_subset[:, i])
            for value in feature_values:
                # Boolean indices based on the feature subset
                left_index = feature_subset[:, i] <= value
                right_index = feature_subset[:, i] > value

                # Subset targets using these indices
                left_targets = y[left_index]
                right_targets = y[right_index]

                # Ensure both sides have samples
                if len(left_targets) > 0 and len(right_targets) > 0:
                    # Calculate the MSE for both the left and right subsets
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    total_average_mse = left_mse * len(left_targets) / len(y) + right_mse * len(right_targets) / len(y)

                    # If this split results in a better (lower) MSE, update the best split
                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        # Return the best split information (feature index, value, and target splits)
        return self.best_split_value, self.best_feature, self.left_y, self.right_y

    
    # Function to recursively build the decision tree
    def _build_tree(self, X, y, depth=0):
        # Base case: If all targets are the same or max depth is reached, return the mean of the target values
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        # Find the best split for the data
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        # If no valid split was found, return the mean of the targets
        if best_feature is None:
            return np.mean(y)
        
        # Split the data based on the best feature and split value
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        # Recursively build the left and right subtrees
        left_tree = self._build_tree(X[left_index], left_y, depth + 1)
        right_tree = self._build_tree(X[right_index], right_y, depth + 1)

        # Return the current node as a dictionary with the best split and its left and right subtrees
        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }

    # Function to make a prediction for a single sample using the tree
    def _single_prediction(self, tree, x):
        # If the current tree node is a dictionary (not a leaf), recursively traverse the tree
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)  # Go left
            else:
                return self._single_prediction(tree['right'], x)  # Go right
        # If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
        else:
            return tree
        
    # Function to predict target values for a set of samples
    def predict(self, X):
        predictions = []
        # For each tree in the random forest, make predictions for all samples in X
        for tree in self.trees:
            model_predictions = np.array([self._single_prediction(tree, x) for x in X])
            predictions.append(model_predictions)
        
        # Combine all the predictions of all the trees and average across the trees
        all_predictions = np.column_stack(predictions)
        all_predictions = np.mean(all_predictions, axis=1)

        return all_predictions
        

    # Function to train the random forest model
    def fit(self, X, y):
        models = []
        # Create n decision trees, each trained with bootstrapping (sampling with replacement)
        for n in range(self.n_estimators):
            random_row_indices = np.random.choice(len(X), len(X), replace=True)
            subset_X = X[random_row_indices]
            subset_y = y[random_row_indices]
            tree = self._build_tree(subset_X, subset_y)

            # Add each trained tree to the list of models
            models.append(tree)

        # Save all the trained trees
        self.trees = models
rf_regressor = RandomForestRegressor(10)
plot_3d_regression(rf_regressor, "Random Forest Regressor", "linear")
plot_3d_regression(rf_regressor, "Random Forest Regressor", "ugly")

As expected, the Random Forest regressor fits both datasets quite well, and has a regression surface that is similar to the Decision Tree, although a bit smoother. Random Forest regression is likely to grow its improvements over Decision Trees as the underlying dataset grows larger and more complex.

Gradient Boosting Regression

For our final regression method, we build a Gradient Boost Regressor. Gradient Boosting Regression builds a model by combining many small and poor decision trees, each one correcting the errors of the previous trees. It works iteratively, adding trees that focus on reducing the residual errors in the predictions. The final prediction is the sum of all the trees’ outputs.

class GradientBoostingRegressor:
    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
        '''A class for building a Gradient Boosting Regressor. Note this is a simple execution and does not include parameters for 
           regularization other than n_estimators, learning_rate, and max_depth'''

        # Initialize model with given number of estimators, learning rate, and max depth for each tree
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth

    def fit(self, X, y):
        # Initialize predictions as the mean of the target values
        self.y_pred = np.full_like(y, np.mean(y), dtype=np.float32)
        self.trees = []

        for _ in range(self.n_estimators):
            # Calculate residuals
            residuals = y - self.y_pred

            # Fit a tree to the residuals
            tree = DecisionTreeRegressor(max_depth=self.max_depth)
            tree.fit(X, residuals)

            # Update predictions with the tree's output scaled by learning rate
            self.y_pred += self.learning_rate * tree.predict(X)
            self.trees.append(tree)

    def predict(self, X):
        # Start with initial predictions as the mean of target values
        y_pred = np.full(X.shape[0], np.mean(self.y_pred), dtype=np.float32)

        # Sum the residual predictions from all trees
        for tree in self.trees:
            y_pred += self.learning_rate * tree.predict(X)
        return y_pred
boost_regressor = GradientBoostingRegressor(n_estimators=50)
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "linear")
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "ugly")

We once again see a very strong fit to the training data both in the linear and non-linear cases from the Gradient Boosting Regressor. Additionally, as compared to previous non-linear methods, the surface is a fair bit smoother. There are a few large, sharp cuts that represent overfitting, but these can be combatted with regularization techniques not included in this simple execution!